/*Проверка конфигурации PG*/
SELECT  
    name,
    setting,
    context
FROM pg_settings
WHERE name IN ('wal_level','max_wal_senders','max_replication_slots');

CREATE ROLE kafka_replica WITH superuser login PASSWORD 'kafka_replica';
CREATE SCHEMA debezium_info;
CREATE TABLE debezium_info.heartbeat (id serial PRIMARY KEY, date_load TIMESTAMPTZ DEFAULT NOW() );

CREATE TABLE public.fact_cost (
                id_fact_cost serial NOT NULL PRIMARY KEY,
                transaction_ts timestamp NOT NULL,
                sum_cost NUMERIC(10,3) NOT NULL
);

/*Выполняется на OLTP базе PG 14*/
WITH insert_100 as(
select 
    generate_series(1, 100, 1) AS test, 
    timestamp '2021-01-01 00:00:00' +
       random() * (timestamp '2021-12-01 00:00:00' -
                   timestamp '2021-01-01 23:59:59') AS transaction_ts,
    random()*1000::NUMERIC(10,3) AS sum_cost)
INSERT  INTO public.fact_cost (transaction_ts, sum_cost)
SELECT transaction_ts,
        sum_cost
FROM insert_100;


SELECT *
FROM debezium_info.heartbeat;

SELECT *
FROM public.fact_cost;


CREATE PUBLICATION dbz_publication 
FOR TABLE debezium_info.heartbeat,public.fact_cost;


SELECT * 
FROM pg_replication_slots;



DELETE FROM public.fact_cost
WHERE id_fact_cost < 10


UPDATE public.fact_cost
SET  sum_cost=-10
WHERE id_fact_cost=100;

ALTER TABLE public.fact_cost ADD column1 varchar NULL;

UPDATE public.fact_cost
SET  column1= 'test'
WHERE id_fact_cost=100;


SELECT *
FROM public.fact_cost

/*Выполняется на реплицируемой PG 9*/
SELECT *
FROM public.fact_cost;

SELECT *
FROM public.fact_cost
WHERE id_fact_cost=100;

/*command line*/
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @postgresql-debezium.json

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @pg_jdbc-sink.json

curl -i -X DELETE localhost:8083/connectors/jdbc-sink/

curl -i -X DELETE localhost:8083/connectors/pg-get-data-connector/

curl -i http://localhost:8083/connectors/jdbc-sink/status 

/*Важно*/
Обратить на название топика и название БД sink коннекторе и в "database.server.name": "db_pg_oltp" они должны называться одинаково! Иначе cross database error!
Название топика ничинается на "database.server.name": "db_pg_oltp"
В docker-compose.yaml идет ссылка на сборку из контекста context: debezium-jdbc!!! Там возможно притаится ошибка. Не отрабатывает heartbeat.



